package com.atguigu.flink.cep;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;

/**
 * Created by Smexy on 2023/3/3
 *
 *
 *   迟到的数据无法处理，可以使用侧流接收，再进行处理！
 */
public class Demo2_HandleLate
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
            .<WaterSensor>forMonotonousTimestamps()
            .withTimestampAssigner( (e, ts) -> e.getTs());

        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> ds = env
            .socketTextStream("hadoop103", 8888)
            .map(new WaterSensorMapFunction())
            .assignTimestampsAndWatermarks(watermarkStrategy);

        OutputTag<WaterSensor> outputTag = new OutputTag<>("late", TypeInformation.of(WaterSensor.class));


        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("s1")
            .where(new SimpleCondition<WaterSensor>()
            {
                //对数据进行匹配,返回true是匹配到的数据
                @Override
                public boolean filter(WaterSensor value) throws Exception {
                    return "s1".equals(value.getId());
                }
            });

        PatternStream<WaterSensor> patternStream = CEP.pattern(ds, pattern)
                                                        //把迟到的数据输出到侧流
                                                       .sideOutputLateData(outputTag);

        SingleOutputStreamOperator<String> ds2 = patternStream.select(new PatternSelectFunction<WaterSensor, String>()
        {
            @Override
            public String select(Map<String, List<WaterSensor>> pattern) throws Exception {
                return pattern.toString();
            }
        });

        //处理正常匹配的数据
        ds2.print();
        ds2.getSideOutput(outputTag).printToErr();




        try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }
}
